//package com.example.demo.kafka;
//
//import org.apache.commons.lang3.time.DateFormatUtils;
//import org.apache.flink.api.common.eventtime.*;
//import org.apache.flink.api.java.functions.KeySelector;
//import org.apache.flink.api.java.operators.DataSource;
//import org.apache.flink.api.java.tuple.Tuple2;
//import org.apache.flink.api.java.tuple.Tuple3;
//import org.apache.flink.streaming.api.TimeCharacteristic;
//import org.apache.flink.streaming.api.datastream.DataStream;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
//import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
//import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
//import org.apache.flink.streaming.api.windowing.time.Time;
//import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
//import org.apache.flink.util.Collector;
//import java.util.Date;
//import java.util.Iterator;
//import java.util.List;
//
///**
// * @Description Watermarks水印：为输入的数据流的设置一个时间事件（时间戳），对窗口内的数据输入流无序与延迟提供解决方案
// */
//public class TimestampsAndWatermarks {
//
//    /**
//     * 官方文档：https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
//     */
//
//    /**
//     * 遍历集合，分别打印不同性别的信息，对于执行超时，自动触发定时器
//     * @param args
//     * @throws Exception
//     */
//    public static void main(String[] args) throws Exception {
//        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        /*
//        TimeCharacteristic有三种时间类型：
//            ProcessingTime：以operator处理的时间为准，它使用的是机器的系统时间来作为data stream的时间；
//            IngestionTime：以数据进入flink streaming data flow的时间为准；
//            EventTime：以数据自带的时间戳字段为准，应用程序需要指定如何从record中抽取时间戳字段；需要实现assignTimestampsAndWatermarks方法，并设置时间水位线；
//         */
//        //使用event time，需要指定事件的时间戳
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//        env.setParallelism(1);
//
//        //设置自动生成水印的时间周期，避免数据流量大的情况下，频繁添加水印导致计算性能降低。
//        env.getConfig().setAutoWatermarkInterval(1000L);
////        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
//
//        DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
//        DataStream<Tuple2<String, Integer>> dataStream = inStream
//                //为一个水位线，这个Watermarks在不断的变化，一旦Watermarks大于了某个window的end_time，就会触发此window的计算，Watermarks就是用来触发window计算的。
//                //Duration.ofSeconds(2)，到数据流到达flink后，再水位线中设置延迟时间，也就是在所有数据流的最大的事件时间比window窗口结束时间大或相等时，再延迟多久触发window窗口结束；
////                .assignTimestampsAndWatermarks(
////                        WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
////                                .withTimestampAssigner((element, timestamp) -> {
////                                    long times = System.currentTimeMillis() ;
////                                    System.out.println(element.f1 + ","+ element.f0 + "的水位线为：" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
////                                    return times;
////                                })
////                )
//                .assignTimestampsAndWatermarks(new MyWatermarkStrategy()
//                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Integer>>() {
//                            @Override
//                            public long extractTimestamp(Tuple3<String, String, Integer> element, long timestamp) {
//                                long times = System.currentTimeMillis();
//                                System.out.println(element.f1 + "," + element.f0 + "的水位线为：" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
//                                return times;
//                            }
//                        }))
//                //分区窗口
//                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
//                //触发3s滚动窗口
//                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
//                //执行窗口数据，对keyBy数据流批量处理
//                .apply(new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>(){
//                    @Override
//                    public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
//                        long times = System.currentTimeMillis() ;
//                        System.out.println();
//                        System.out.println("窗口处理时间：" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
//                        Iterator<Tuple3<String, String, Integer>> iterator = input.iterator();
//                        int total = 0;
//                        int size = 0;
//                        String sex = "";
//                        while (iterator.hasNext()){
//                            Tuple3<String, String, Integer> tuple3 = iterator.next();
//                            total += tuple3.f2;
//                            size ++;
//                            sex = tuple3.f1;
//                        }
//                        out.collect(new Tuple2<>(sex, total / size));
//                    }
//                });
//
//        dataStream.print();
//        env.execute("flink Filter job");
//    }
//
//    /**
//     * 定期水印生成器
//     */
//    public static class MyWatermarkStrategy implements WatermarkStrategy<Tuple3<String, String, Integer>>{
//        @Override
//        public WatermarkGenerator<Tuple3<String, String, Integer>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
//            return new WatermarkGenerator<Tuple3<String, String, Integer>>() {
//                //设置固定的延迟量3.5 seconds
//                private final long maxOutOfOrderness = 3500;
//                private long currentMaxTimestamp;
//
//                /**
//                 * 事件处理
//                 * @param event             数据流对象
//                 * @param eventTimestamp    事件水位线时间
//                 * @param output            输出
//                 */
//                @Override
//                public void onEvent(Tuple3<String, String, Integer> event, long eventTimestamp, WatermarkOutput output) {
//                    currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);
//                }
//                @Override
//                public void onPeriodicEmit(WatermarkOutput output) {
//                    // 拿上一个水印时间 - 延迟量 = 等于给的窗口最终数据最后时间（如果在窗口到期内，未发生新的水印事件，则按window正常结束时间计算，当在最后水印时间-延迟量的时间范围内，有新的数据流进入，则会重新触发窗口内对全部数据流计算）
//                    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
//                }
//            };
//        }
//    }
//
//    /**
//     * 模拟数据持续输出
//     */
//    public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
//        @Override
//        public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
//            List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
//            int j = 0;
//            for (int i=0;i<100;i++){
//                if (i%6 == 0){
//                    j=0;
//                }
//                ctx.collect(tuple3List.get(j));
//                //1秒钟输出一个
//                Thread.sleep(1 * 1000);
//                j ++;
//            }
//        }
//        @Override
//        public void cancel() {
//            try{
//                super.close();
//            }catch (Exception e){
//                e.printStackTrace();
//            }
//        }
//    }
//
//}